package com.atguigu.flink.checkpoint;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;

/**
 * Created by Smexy on 2023/11/18
 *
 端到端的EOS：
    kafkasource读 ---->ck ----->KafkaSink
        kafkasource默认读取当前算子的状态，从状态中获取offsets，当前状态中没有offsets，才会使用策略。

 -------------------
 Unexpected error in InitProducerIdResponse;  初始化生产者出错
    生产者客户端的事务的超时时间(1h)，超过了broker允许的最大值(15min)。
    The transaction timeout is larger than the maximum value allowed by the broker
 (as configured by transaction.max.timeout.ms).

 */
public class Demo3_KafkaEOS
{
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);
        //2.开启了barrier对齐的EOS
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck3");

        //1.Source端可以重读，数据是持久化
        KafkaSource<String> kafkaSource = KafkaSource
            .<String>builder()
            .setBootstrapServers("hadoop102:9092,hadoop103:9092")  //指定kafka集群地址
            .setTopics("t1")
            .setGroupId("test3")
            .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true")
            .setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000")
            //保证读取t1中别人已经提交事务的数据，不会造成脏读，幻读
            .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed")
            .build();


        //3.使用的是支持2PC提交的sink，保证事务写出，EOS
        KafkaSink<String> kafkaSink = KafkaSink
            .<String>builder()
            .setBootstrapServers("hadoop102:9092")
            .setRecordSerializer(
                KafkaRecordSerializationSchema.builder()
                                              //只把数据作为value写入
                                              .setValueSerializationSchema(new SimpleStringSchema())
                                              .setTopic("t2")
                                              .build()
            )
            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            //用于kafka去检测事务是否重复提交，每个事务都有唯一的id，也会缓存在状态中
            .setTransactionalIdPrefix("atguigu-")
            //设置一些额外生产者的参数配置。参数名ProducerConfig
            .setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "200")
            .setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000")
            //客户端的事务超时时间，必须小于 broker端配置的时间(15min)
            .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10 * 60 * 1000 +"")
            .build();

        DataStreamSource<String> ds = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kakfa");

        ds.sinkTo(kafkaSink);

        ds.addSink(new SinkFunction<String>()
        {
            @Override
            public void invoke(String value, Context context) throws Exception {
                Thread.sleep(2000);
                if (value.contains("s10")){
                    throw new RuntimeException("出异常了....");
                }
                System.out.println(value);
            }
        });

        env.execute();

    }
}
